package com.boat.kafka.consume;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaMessageConsumer {

	private Logger logger = LoggerFactory.getLogger(KafkaMessageConsumer.class);

	private ExecutorService executors;
	private String topics;
	private int partitionsNum;

	public abstract void processMessage(List<String> message);

	private int pollTime = 100;
	private Properties properties;

	public void init() {
		executors = new ThreadPoolExecutor(partitionsNum, partitionsNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
		executors.submit(new Runnable() {
			public void run() {
				KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
				consumer.subscribe(Arrays.asList(topics));
				while (true) {
					ConsumerRecords<String, String> records = consumer.poll(pollTime);
					List<String> messageList = new ArrayList<String>();
					for (ConsumerRecord record : records) {
						try {
							// 信息处理
							String message = new String(record.value().toString().getBytes(), "UTF-8");
							messageList.add(message);
						} catch (UnsupportedEncodingException e) {
							logger.error("数据转换失败", e);
							// 异常数据处理
						}
					}
					try {
						processMessage(messageList);
					} catch (Exception e) {
						logger.error("数据处理失败", e);
						// 异常数据处理
					}
					consumer.commitSync();
				}
			}
		});
	}

	public void destroy() {
		if (executors != null) {
			executors.shutdown();
		}
		try {
			if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
				logger.error("Timeout.... Ignore for this case");
			}
		} catch (InterruptedException ignored) {
			logger.error("Other thread interrupted this shutdown, ignore for this case.", ignored);
			Thread.currentThread().interrupt();
		}
	}

	public void setProperties(Properties properties) {
		this.properties = properties;
	}

	public void setTopics(String topics) {
		this.topics = topics;
	}

	public void setPartitionsNum(int partitionsNum) {
		this.partitionsNum = partitionsNum;
	}

	public void setPollTime(int pollTime) {
		this.pollTime = pollTime;
	}
}
